691884c7f373502a6e71f23c503349579ccd4690,cdap-app-templates/cdap-etl/cdap-etl-lib/src/main/java/co/cask/cdap/templates/etl/batch/sinks/TimePartitionedFileSetDatasetAvroSink.java,TimePartitionedFileSetDatasetAvroSink,configurePipeline,#ETLStage#PipelineConfigurer#,66
Before Change
public void configurePipeline(ETLStage stageConfig, PipelineConfigurer pipelineConfigurer) {
// if the base path is provided then we should try to create the fileset here
if (!Strings.isNullOrEmpty(stageConfig.getProperties().get(BASE_PATH))) {
String tpfsName = stageConfig.getProperties().get(TPFS_NAME);
Preconditions.checkArgument(!Strings.isNullOrEmpty(tpfsName), "TimePartitionedFileSet name must be given.");
pipelineConfigurer.createDataset(tpfsName, TimePartitionedFileSet.class.getName(), FileSetProperties.builder()
.setBasePath(stageConfig.getProperties().get(BASE_PATH))
.setInputFormat(AvroKeyInputFormat.class)
.setOutputFormat(AvroKeyOutputFormat.class)
.setEnableExploreOnCreate(true)
.setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe")
.setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")
.setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")
.setTableProperty("avro.schema.literal", (stageConfig.getProperties().get(SCHEMA)))
.build());
}
}
After Change
@Override
public void configurePipeline(ETLStage stageConfig, PipelineConfigurer pipelineConfigurer) {
// if the base path is provided then we should try to create the fileset here
Map<String, String> properties = stageConfig.getProperties();
if (!Strings.isNullOrEmpty(properties.get(Properties.TimePartitionedFileSetDataset.BASE_PATH))) {
String tpfsName = properties.get(Properties.TimePartitionedFileSetDataset.TPFS_NAME);
Preconditions.checkArgument(!Strings.isNullOrEmpty(tpfsName), "TimePartitionedFileSet name must be given.");
pipelineConfigurer.createDataset(tpfsName, TimePartitionedFileSet.class.getName(), FileSetProperties.builder()